/*
Copyright 2021 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourceinterpreter

import (
	"context"
	"errors"

	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	corev1 "k8s.io/client-go/listers/core/v1"
	"k8s.io/klog/v2"

	configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
	workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
	"github.com/karmada-io/karmada/pkg/resourceinterpreter/customized/declarative"
	"github.com/karmada-io/karmada/pkg/resourceinterpreter/customized/webhook"
	"github.com/karmada-io/karmada/pkg/resourceinterpreter/customized/webhook/request"
	"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
	"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/thirdparty"
	"github.com/karmada-io/karmada/pkg/util"
	"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
	"github.com/karmada-io/karmada/pkg/util/helper"
)

// ResourceInterpreter manages both default and customized webhooks to interpret custom resource structure.
type ResourceInterpreter interface {
	// Start initializes the resource interpreter and performs cache synchronization.
	Start(ctx context.Context) (err error)

	// HookEnabled tells if any hook exist for specific resource type and operation.
	HookEnabled(objGVK schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool

	// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
	GetReplicas(object *unstructured.Unstructured) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error)

	// ReviseReplica revises the replica of the given object.
	ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error)

	// GetComponents extracts the resource requirements for multiple components from the given object.
	// This hook is designed for CRDs with multiple components (e.g., FlinkDeployment), but can
	// also be used for single-component resources like Deployment.
	// If implemented, the controller will use this hook to obtain per-component replica and resource
	// requirements, and will not call GetReplicas.
	// If not implemented, the controller will fall back to GetReplicas for backward compatibility.
	// This hook will only be called when the feature gate 'MultiplePodTemplatesScheduling' is enabled.
	GetComponents(object *unstructured.Unstructured) (components []workv1alpha2.ComponentRequirements, err error)

	// Retain returns the objects that based on the "desired" object but with values retained from the "observed" object.
	Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error)

	// AggregateStatus returns the objects that based on the 'object' but with status aggregated.
	AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error)

	// GetDependencies returns the dependent resources of the given object.
	GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, err error)

	// ReflectStatus returns the status of the object.
	ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, err error)

	// InterpretHealth returns the health state of the object.
	InterpretHealth(object *unstructured.Unstructured) (healthy bool, err error)

	// other common method
}

// NewResourceInterpreter builds a new ResourceInterpreter object.
func NewResourceInterpreter(informer genericmanager.SingleClusterInformerManager, serviceLister corev1.ServiceLister) ResourceInterpreter {
	return &customResourceInterpreterImpl{
		informer:      informer,
		serviceLister: serviceLister,
	}
}

type customResourceInterpreterImpl struct {
	informer      genericmanager.SingleClusterInformerManager
	serviceLister corev1.ServiceLister

	configurableInterpreter *declarative.ConfigurableInterpreter
	customizedInterpreter   *webhook.CustomizedInterpreter
	thirdpartyInterpreter   *thirdparty.ConfigurableInterpreter
	defaultInterpreter      *native.DefaultInterpreter
}

// Start initializes all interpreters and load all ResourceInterpreterCustomization and
// ResourceInterpreterWebhookConfiguration configurations into the cache.
// It is recommended to be called before all controllers. After called, the resource interpreter
// will be ready to interpret custom resources.
func (i *customResourceInterpreterImpl) Start(_ context.Context) (err error) {
	klog.Infoln("Starting resource interpreter.")

	i.customizedInterpreter, err = webhook.NewCustomizedInterpreter(i.informer, i.serviceLister)
	if err != nil {
		return err
	}
	i.configurableInterpreter = declarative.NewConfigurableInterpreter(i.informer)

	i.thirdpartyInterpreter = thirdparty.NewConfigurableInterpreter()
	i.defaultInterpreter = native.NewDefaultInterpreter()

	i.informer.Start()
	i.informer.WaitForCacheSync()

	if err = i.loadConfig(); err != nil {
		return err
	}

	klog.Infoln("Resource interpreter started.")
	return nil
}

// HookEnabled tells if any hook exist for specific resource type and operation.
func (i *customResourceInterpreterImpl) HookEnabled(objGVK schema.GroupVersionKind, operation configv1alpha1.InterpreterOperation) bool {
	return i.defaultInterpreter.HookEnabled(objGVK, operation) ||
		i.thirdpartyInterpreter.HookEnabled(objGVK, operation) ||
		i.configurableInterpreter.HookEnabled(objGVK, operation) ||
		i.customizedInterpreter.HookEnabled(objGVK, operation)
}

// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
func (i *customResourceInterpreterImpl) GetReplicas(object *unstructured.Unstructured) (replica int32, requires *workv1alpha2.ReplicaRequirements, err error) {
	var hookEnabled bool

	replica, requires, hookEnabled, err = i.configurableInterpreter.GetReplicas(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	replica, requires, hookEnabled, err = i.customizedInterpreter.GetReplicas(context.TODO(), &request.Attributes{
		Operation: configv1alpha1.InterpreterOperationInterpretReplica,
		Object:    object,
	})
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	replica, requires, hookEnabled, err = i.thirdpartyInterpreter.GetReplicas(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	replica, requires, err = i.defaultInterpreter.GetReplicas(object)
	return
}

// ReviseReplica revises the replica of the given object.
func (i *customResourceInterpreterImpl) ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error) {
	obj, hookEnabled, err := i.configurableInterpreter.ReviseReplica(object, replica)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	klog.V(4).Infof("Revise replicas for object: %v %s/%s with webhook interpreter.", object.GroupVersionKind(), object.GetNamespace(), object.GetName())
	obj, hookEnabled, err = i.customizedInterpreter.Patch(context.TODO(), &request.Attributes{
		Operation:   configv1alpha1.InterpreterOperationReviseReplica,
		Object:      object,
		ReplicasSet: int32(replica), // #nosec G115: integer overflow conversion int64 -> int32
	})
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}
	obj, hookEnabled, err = i.thirdpartyInterpreter.ReviseReplica(object, replica)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	return i.defaultInterpreter.ReviseReplica(object, replica)
}

// GetComponents returns the requirements for each component of the given object.
// This method is intended to extract per-component replica and resource requirements,
// especially for resources that define multiple components (e.g., CRDs with multiple pod templates).
func (i *customResourceInterpreterImpl) GetComponents(object *unstructured.Unstructured) ([]workv1alpha2.ComponentRequirements, error) {
	if object == nil {
		return nil, errors.New("nil object")
	}

	components, hookEnabled, err := i.configurableInterpreter.GetComponents(object)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return components, nil
	}

	components, hookEnabled, err = i.thirdpartyInterpreter.GetComponents(object)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return components, nil
	}

	// TODO(@RainbowMango): Implement GetComponents for extracting per-component replica and resource requirements.
	// Follow up tracked by: https://github.com/karmada-io/karmada/issues/6641
	return nil, errors.New("interface GetComponents not implemented yet")
}

// Retain returns the objects that based on the "desired" object but with values retained from the "observed" object.
func (i *customResourceInterpreterImpl) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
	obj, hookEnabled, err := i.configurableInterpreter.Retain(desired, observed)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	klog.V(4).Infof("Retain object: %v %s/%s with webhook interpreter.", desired.GroupVersionKind(), desired.GetNamespace(), desired.GetName())
	obj, hookEnabled, err = i.customizedInterpreter.Patch(context.TODO(), &request.Attributes{
		Operation:   configv1alpha1.InterpreterOperationRetain,
		Object:      desired,
		ObservedObj: observed,
	})
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}
	obj, hookEnabled, err = i.thirdpartyInterpreter.Retain(desired, observed)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	return i.defaultInterpreter.Retain(desired, observed)
}

// AggregateStatus returns the objects that based on the 'object' but with status aggregated.
func (i *customResourceInterpreterImpl) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
	obj, hookEnabled, err := i.configurableInterpreter.AggregateStatus(object, aggregatedStatusItems)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	klog.V(4).Infof("Aggregate status of object: %v %s/%s with webhook interpreter.", object.GroupVersionKind(), object.GetNamespace(), object.GetName())
	obj, hookEnabled, err = i.customizedInterpreter.Patch(context.TODO(), &request.Attributes{
		Operation:        configv1alpha1.InterpreterOperationAggregateStatus,
		Object:           object.DeepCopy(),
		AggregatedStatus: aggregatedStatusItems,
	})
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}
	obj, hookEnabled, err = i.thirdpartyInterpreter.AggregateStatus(object, aggregatedStatusItems)
	if err != nil {
		return nil, err
	}
	if hookEnabled {
		return obj, nil
	}

	return i.defaultInterpreter.AggregateStatus(object, aggregatedStatusItems)
}

// GetDependencies returns the dependent resources of the given object.
func (i *customResourceInterpreterImpl) GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, err error) {
	dependencies, hookEnabled, err := i.configurableInterpreter.GetDependencies(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	dependencies, hookEnabled, err = i.customizedInterpreter.GetDependencies(context.TODO(), &request.Attributes{
		Operation: configv1alpha1.InterpreterOperationInterpretDependency,
		Object:    object,
	})
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	dependencies, hookEnabled, err = i.thirdpartyInterpreter.GetDependencies(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	dependencies, err = i.defaultInterpreter.GetDependencies(object)
	return
}

// ReflectStatus returns the status of the object.
func (i *customResourceInterpreterImpl) ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, err error) {
	status, hookEnabled, err := i.configurableInterpreter.ReflectStatus(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	status, hookEnabled, err = i.customizedInterpreter.ReflectStatus(context.TODO(), &request.Attributes{
		Operation: configv1alpha1.InterpreterOperationInterpretStatus,
		Object:    object,
	})
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	status, hookEnabled, err = i.thirdpartyInterpreter.ReflectStatus(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	status, err = i.defaultInterpreter.ReflectStatus(object)
	return
}

// InterpretHealth returns the health state of the object.
func (i *customResourceInterpreterImpl) InterpretHealth(object *unstructured.Unstructured) (healthy bool, err error) {
	healthy, hookEnabled, err := i.configurableInterpreter.InterpretHealth(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	healthy, hookEnabled, err = i.customizedInterpreter.InterpretHealth(context.TODO(), &request.Attributes{
		Operation: configv1alpha1.InterpreterOperationInterpretHealth,
		Object:    object,
	})
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}
	healthy, hookEnabled, err = i.thirdpartyInterpreter.InterpretHealth(object)
	if err != nil {
		return
	}
	if hookEnabled {
		return
	}

	healthy, err = i.defaultInterpreter.InterpretHealth(object)
	return
}

// loadConfig loads the full set of ResourceInterpreterCustomization and
// ResourceInterpreterWebhookConfiguration configurations into the cache. It avoids resource interpreter
// parsing errors when the resource interpreter starts and the cache is not synchronized.
func (i *customResourceInterpreterImpl) loadConfig() error {
	customizations, err := i.informer.Lister(util.ResourceInterpreterCustomizationsGVR).List(labels.Everything())
	if err != nil {
		klog.Errorf("Failed to list resourceinterpretercustomizations: %v", err)
		return err
	}
	klog.V(5).Infof("Found %d resourceinterpretercustomizations", len(customizations))

	declareConfigs := make([]*configv1alpha1.ResourceInterpreterCustomization, len(customizations))
	for index, c := range customizations {
		config := &configv1alpha1.ResourceInterpreterCustomization{}
		if err = helper.ConvertToTypedObject(c, config); err != nil {
			klog.Errorf("Failed to convert resourceinterpretercustomization: %v", err)
			return err
		}
		declareConfigs[index] = config
	}
	i.configurableInterpreter.LoadConfig(declareConfigs)

	webhooks, err := i.informer.Lister(util.ResourceInterpreterWebhookConfigurationsGVR).List(labels.Everything())
	if err != nil {
		klog.Errorf("Failed to list resourceinterpreterwebhookconfigurations: %v", err)
		return err
	}
	klog.V(5).Infof("Found %d resourceinterpreterwebhookconfigurations", len(webhooks))

	webhookConfigs := make([]*configv1alpha1.ResourceInterpreterWebhookConfiguration, len(webhooks))
	for index, c := range webhooks {
		config := &configv1alpha1.ResourceInterpreterWebhookConfiguration{}
		if err = helper.ConvertToTypedObject(c, config); err != nil {
			klog.Errorf("Failed to convert resourceinterpreterwebhookconfiguration: %v", err)
			return err
		}
		webhookConfigs[index] = config
	}
	i.customizedInterpreter.LoadConfig(webhookConfigs)

	return nil
}
